Core: Add PartitioningWriter#3164
Conversation
| } | ||
|
|
||
| private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) { | ||
| Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent( |
There was a problem hiding this comment.
I think it's okay now, don't have to address that comment.
There was a problem hiding this comment.
We could make a PartitionMap class that works like PartitionSet for this. No need to do it right now though. I agree that we should move forward with this implementation and update it later if needed.
There was a problem hiding this comment.
Yeah, PartitionMap could a be a solution here.
84011cc to
4608f58
Compare
| writer.write(row); | ||
| } | ||
|
|
||
| private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) { |
There was a problem hiding this comment.
Compared to the old implementation, we have an extra computeIfAbsent call and using StructLikeMap instead of a regular map. The performance hit seems negligible according to benchmark results.
|
I went ahead and added benchmarks to this PR. |
| } | ||
|
|
||
| ext { | ||
| jmhVersion = '1.21' |
There was a problem hiding this comment.
Somehow, this did not seem to have any effect. I had to move it to the jmh block.
4608f58 to
cddd636
Compare
| writerFactory, fileFactory, table.io(), | ||
| fileFormat, TARGET_FILE_SIZE); | ||
|
|
||
| writer.close(); |
There was a problem hiding this comment.
Why do we close the writer twice?
There was a problem hiding this comment.
To make sure it is idempotent. Spark may call close multiple times.
|
|
||
| DeleteWriteResult result = writer.result(); | ||
| Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size()); | ||
| Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size()); |
There was a problem hiding this comment.
What's the difference between this check and the one beneath it?
There was a problem hiding this comment.
Just checking referencesDataFiles is consistent with the number of referenced data files reported.
|
Thanks @aokolnychyi for pinging me, let me take a look today ! |
| } | ||
|
|
||
| if (completedSpecIds.contains(spec.specId())) { | ||
| throw new IllegalStateException("Already closed files for spec: " + spec.specId()); |
There was a problem hiding this comment.
I will prefer to add the partitionSpec and the specId together (rather than only the specId) in the IllegalStateException message, because I've seen many users publish questions about what's wrong about the message Already closed files for partition ..., it just a sort issue. What I am trying to say is: it's quite easy for the iceberg beginners to get the meaning of Already closed files for spec: 3 if we keep the current message.
There was a problem hiding this comment.
Maybe, it is the right time to add a longer error message that will clarify what happened. I'll look into that.
There was a problem hiding this comment.
Yeah, I agree with @openinx here. This is a good opportunity to improve that error message. Now that this is the clustered writer, we can say that incoming records need to be clustered by partition. You can use PartitionSet for this so it's really easy to track.
There was a problem hiding this comment.
Also better to use a string representation of the spec rather than the spec ID.
There was a problem hiding this comment.
The new exception looks like this:
java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'data=aaa' in spec [
1000: data: identity(2)
]
| currentPartition = StructCopy.copy(partition); | ||
| currentWriter = newWriter(currentSpec, currentPartition); | ||
|
|
||
| } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) { |
There was a problem hiding this comment.
It will throw NullPointerException if the partition is null because the partitionComparator cannot compare null values , right ? I remember we will use null value for partition for unifying the partitioned writer and unpartitioned writer code path.
There was a problem hiding this comment.
You are right the comparator will throw an NPE but I think partition != currentPartition prevents us from calling the comparator whenever at least one value is null. Partition can be null only for unpartitioned specs. As long as we are writing unpartitioned records, partition != currentPartition will be false.
Whenever partition != currentPartition and at least one of them is null, it means we are changing the spec. If so, it will be handled by the if block above and we won't call the comparator at all.
There was a problem hiding this comment.
I agree. If currentPartition is null (as it is initialized) and a non-null partition is passed in, then the first check is true and the second check runs, which will pass both to the comparator. If we don't think that the comparator can handle null then we should update this.
There was a problem hiding this comment.
@rdblue, in the use case you mention, this if branch won't be invoked as the one above it will work. This if branch is only tested when we wrote at least a record and the new record belongs to the same spec as the previous record. That means if one partition is null, the second must be too, so partition != currentPartition is false and the comparator is not used.
This is something that will be invoked for every row so I would like to avoid any extra checks.
There was a problem hiding this comment.
Okay, got it. That sounds fine.
|
|
||
| @Override | ||
| protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) { | ||
| // TODO: support ORC rolling writers |
| Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent( | ||
| spec.specId(), | ||
| id -> StructLikeMap.create(spec.partitionType())); | ||
| FileWriter<T, R> writer = specWriters.get(partition); |
There was a problem hiding this comment.
For fanout write path, this line is the one of the hottest line because it will need to compare the partition field values for every row. For unpartitioned table, we also need to get the null key from the specWriters map. In the old implementation, we don't need to get the writer from the map for unpartitioned table. Is there any performance regression when comparing the two ?
There was a problem hiding this comment.
If I am not mistaken, we only use the fanout writer for partitioned tables. Even in the old implementation.
You are right about this being the place where we need attention. Like I said here, we have an extra computeIfAbsent call and using StructLikeMap instead of a regular map with PartitionKey. While the performance hit seems to be negligible according to benchmark results I posted, I'd up to optimize this as much as possible.
One thing to consider is the performance of equals and hashCode in StructLikeWrapper vs PartitionKey. It is relatively simple and efficient in PartitionKey where we compare/iterate through object array. In the wrapper, these methods are more involved but don't seem drastically expensive.
One optimization idea is to introduce a cache of Comparators and JavaHash objects we use in the wrapper. At this point, we will create a comparator and a java hash for every partition we add to StructLikeMap. Even if we write to 1k partitions, I am not sure the difference is noticeable.
Another optimization idea can be to introduce a new interface to indicate when a StructLike is backed by an array of values. If two structs implement that interface, we can just compare the arrays in StructLikeWrapper.
I am going to do a separate benchmark for HashMap with PartitionKey and StructLikeMap with PartitionKey.
There was a problem hiding this comment.
I did more benchmarks for 2.5 million records and 1000k partitions. I was using get/put methods heavily.
StructLikeMap<String> map = StructLikeMap.create(SPEC.partitionType());
PartitionKey partitionKey = new PartitionKey(SPEC, SCHEMA);
StructType dataSparkType = SparkSchemaUtil.convert(SCHEMA);
InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);
for (InternalRow row : rows) {
partitionKey.partition(internalRowWrapper.wrap(row));
String res = map.get(partitionKey);
if (res == null) {
map.put(StructCopy.copy(partitionKey), "XXX");
}
}
blackhole.consume(map);
Performance numbers came very close both time and memory-wise.
Benchmark Mode Cnt Score Error Units
MapBenchmark.hashMap ss 5 0.274 ± 0.066 s/op
MapBenchmark.structLikeMap ss 5 0.358 ± 0.056 s/op
Given such a minor difference for 2.5 million records, I'd say we should be good without any optimizations.
There was a problem hiding this comment.
This may be because we cache the hash value in StructLikeWrapper.
There was a problem hiding this comment.
Thanks for the detail explanation and performance report, @aokolnychyi !
RussellSpitzer
left a comment
There was a problem hiding this comment.
Looks good to me, just a few comments
| if (completedPartitions.contains(partition)) { | ||
| String path = spec.partitionToPath(partition); | ||
| String errMsg = String.format("Already closed files for partition '%s' in spec %d", path, spec.specId()); | ||
| throw new IllegalStateException(errMsg); |
There was a problem hiding this comment.
Nit: do we need a variable for errMsg?
There was a problem hiding this comment.
I am not a big fan of splitting lines so I added an extra variable. This place changed a little bit. Let me know what you currently think.
| () -> { | ||
| try { | ||
| writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa")); | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
I don't think that write should throw IOException. We always wrap IOException in UncheckedIOException so it makes no sense for us to throw it from the writer interface.
I think I missed this when reviewing the FileWriter interfaces.
There was a problem hiding this comment.
Somehow, I assumed our delete writers throw one. I'll update FileWriter and PartitioningWriter interfaces.
There was a problem hiding this comment.
Okay, I think I remember now. Classes like PartitioningWriter close other writers and close throws an exception. I'll need to wrap such places and rethrow UncheckedIOException.
There was a problem hiding this comment.
I'll do that in a follow-up.
|
Thanks for reviewing, @RussellSpitzer @openinx @rdblue! |
This PR adds the
PartitioningWriterinterface and two implementations:ClusteredWriterFanoutWriterIt is a subset of changes in PR #2945.